Планировщик бизнес-процессов
Для реализации в следующей версии BPE 5.0 примитивов #gateway и #subProcess (ISO/IEC-19510), нам понадобится что-то вроде скедулера, а также реализация древовидной истории процесса. Текущая версия BPE, как вы знаете, однопоточная, это означает, что для реализации паралельных вычислений вы должны явно стартовать другой процесс из определенной задачи, и он уже будет управлятся напрямую своим Erlang-процессом, но тоже иметь свою линейную историю. Связывать и ждать завешение вам тоже нужно самому и изобретать для этого свой протокол. Почему сразу не написали ISO c субпроцессами? Потому что хотели показать, что и без гейтвеев можно строить системы, показать то что важно, но теперь пришла пора полностью реализовать стандарт. Поэтому сразу — новые рекорды и алгоритм.
#gateway
Согласно стандарту гейтвеи #gateway в общем случае это мультиплексоры или демультиплексоры с разной логикой: "И" (паралельное выполнение), "ИЛИ" (однопоточное условное выполнение, условный оператор), а также для редюсеров: ждать "ВСЕ" ребра, "ОДНО" или определённое "ПОДМНОЖЕСТВО" рёбер.
-type gate() :: none | exclusive | parallel | inclusive | complex | event.
#subProcess
#subProcess же просто обычная задача, которая создает дополнительную группировку, в реальности все будет происходить в том же Эрланг процессе, новый эрланг процесс создаваться не будет.
Алгоритм
Начнем с примера. Представим себе граф Х, в которм вершины B, C, F являются паралельным многопоточным сплитом (fork), а вершины G и J являются паралельными демультимплексорами, которые ожидают завершение всех входящих вершин (join), т.е. их наличие в истории процесса.
E ----------- H
4 / 7 /
C / 8
1 2 / \ 5 9 /
A ---- B F --- I / [ Граф X ]
3 \ \ 11 \ 10
D - G --- J ----- K
6 12 13
NOTE: обратите внимание, что в общем случае у процесса может быть несколько точек входа, это означает, что на начальном этапе в планировщие уже присутствует несколько паралельных потоков выполнения которые будут квантоваться линейной последовательностью событий самого Erlang-процесса.
Итак, в примере у нас старт с А(1), она же будет первым потоком планировщика, адреса других потоков планировщика (в разное время): A/B(2,3), A/B/C(4,5), A/B/C/F(9,11), т.е. кортежи составленые из разветвлений.
Первый шаг, начинаем процесс и выполняем ребро 1, которе нам дает единственный поток выполнения А, событие #beginEvent. Сразу перед вторым шагом делаем следующее: видим, что ребро 1 нас привело в состояние B, из которого исходят паралельные ребра 2 и 3 поэтому мы их еще до выполнения второго шага, ложим в планировщик, а ребро 1 и поток выполнения А удаляем, так как после этой точки уже будут другие потоки управления. На втором шаге у нас два потока выполнения: A/B(2) и A/B(3). Выбираем первый попавшийся и делаем шаг по этому ребру, например, A/B(2). Тут, поскольку стоит опять гейтвей, мы еще перед выполнением третьего шаго изменяем потоки планировщика, и, точно также как на первом ребре, мы удаляем A/B(2) поток и на его место создаем два потока A/B/C(4) и A/B/C(5), поток A/B(3) при этом не трогаем.
Внутри на каждом шаге мы храним курсоры потоков выполнения в циклическом массиве и крутим его индекс на каждом шаге, одновременно обновляя курсоры. В приведенном линеаризированом трейсе покажем как упаковывается древовидная история процесса.
Шаг 0: BEGIN, [BEGIN->1], 1
Шаг 1: [1->2,3], 1
Шаг 2: [2->4,5;3], 3
Шаг 3: [4;5;3->6], 1
Шаг 4: [4->7;5;6], 2
Шаг 5: [7;5->9,11;6], 4
Шаг 6: [7;9;11], 1
Шаг 7: [7->8;9;11], 2
Шаг 8: [8;9->10;11], 3
Шаг 9: [8;11->10;12], 3
Шаг 10: [10;12], 1
Шаг 11: [12], 1
Шаг 12: [13], 1
Шаг 13: [], 0
В этом CSV-логе первый элемент -- это номер выполнившегося ребра, второй -- это состояние планировщика (список его потоков разделенных точкой с запятой), третий -- это указатель на текущий поток выполнения.
На join элементах нужно гарантировать, что входщие ребра необходимые для пропихивания процесса вперед, находятся в истории процесса (это делается за 1 get по индексу истории процесса для каждого входящего ребра). К счатью в нашем примере алгоритм линеаризировал автоматически правильную последовательность без рейсов и блокировок.
При наличие циклов в графе, трейс процеса потенциально может иметь бесконечную длину. Алгоритм позволяет создать план выполнения даже бесконечных процессов при отсутствии свободных переменных в логике (функции) процесса, в этом случае недетерминизм.
Имплементация
Представление графа Х (описание бизнес-процеса):
start() ->
R=[{r1, a,b},
{r2, b,c},
{r3, b,d},
{r4, c,e},
{r5, c,f},
{r6, d,g},
{r7, e,h},
{r8, h,j},
{r9, f,i},
{r10,i,j},
{r11,f,g},
{r12,g,j},
{r13,j,k}],
N=[{a,[],[r1]},
{b,[r1],[r2,r3]},
{c,[r2],[r4,r5]},
{d,[r3],[r6]},
{e,[r4],[r7]},
{f,[r5],[r9, r11]},
{g,[r6,r11],[r12]},
{j,[r8,r10,r12],[r13]},
{i,[r9],[r10]},
{h,[r7],[r8]},
{k,[r13],[]}],
walk(R, N, {0, [r1], 1, []}).
Рекурсор процесса:
walk(_,_, {_,[],_, Visited}) -> lists:reverse(Visited);
walk(R,N, State) -> walk(R,N, step(R, N, State)).
Прототип планировщика на 10 строчек:
0 step(Rs, Ns, {Step, Threads, Pointer, Visited}) ->
1 CurrentR = lists:keyfind(lists:nth(Pointer, Threads), 1, Rs),
2 CurrentN = lists:keyfind(element(3, CurrentR), 1, Ns),
3 Rids = element(2, CurrentN),
4 NewStep = Step + 1,
5 NewVis = [CurrentR | Visited],
6 Check = lists:all(fun(Rid) -> lists:member(Rid, [element(1,R) || R <- NewVis]) end, Rids),
7 Inserted = case Check of false -> []; true -> element(3, CurrentN) end,
8 NewThreads = lists:sublist(Threads, Pointer-1) ++ Inserted ++ lists:nthtail(Pointer, Threads),
9 NewPointer = if Pointer == length(Threads) -> 1; true -> Pointer + length(Inserted) end,
10 {NewStep, NewThreads, NewPointer, NewVisited}.
Тут можно почитать про то как это же делается в Camunda.